{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "a93a69d8",
   "metadata": {},
   "source": [
    "# Chapter 8 - Scaling Up On Google Cloud"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0a57144a",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "import json"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "13d62f9c",
   "metadata": {},
   "source": [
    "## Step 1 and 2 - Data Acquisition, Standardization"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "153b3f27-e75f-4eab-9437-153e9c6d0567",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_m = pd.read_csv('gs://<your bucket>/handsonentityresolution/mari_clean.csv')\n",
    "df_c = pd.read_csv('gs://<your bucket>/handsonentityresolution/basic_clean.csv')"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "dbdbdb21",
   "metadata": {},
   "source": [
    "## Step 3 - Record Blocking and Attribute Comparison"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d7d0e6cc-4081-4e6b-9b67-8d44e4468f24",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql import types\n",
    "\n",
    "conf = SparkConf()\n",
    "\n",
    "# This parallelism setting for cluster of 2 n2-standard-4\n",
    "conf.set(\"spark.default.parallelism\", \"240\")\n",
    "conf.set(\"spark.sql.shuffle.partitions\", \"240\")\n",
    "\n",
    "sc = SparkContext.getOrCreate(conf=conf)\n",
    "spark = SparkSession(sc)\n",
    "spark.sparkContext.setCheckpointDir(\"gs://<your bucket>/handsonentityresolution/\")\n",
    "\n",
    "# Register the jaro winkler custom udf\n",
    "spark.udf.registerJavaFunction(\n",
    "    \"jaro_winkler_similarity\", \"uk.gov.moj.dash.linkage.JaroWinklerSimilarity\", types.DoubleType()\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "36785aea-bfda-4191-8227-907dd3f5d679",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import StructType, StructField, StringType, IntegerType\n",
    "\n",
    "schema = StructType([StructField(\"Postcode\", StringType()), StructField(\"CompanyName\", StringType()),StructField(\"unique_id\", IntegerType())])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "29d9db2a-8e61-496d-b5d9-67da39b4478c",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_c = df_c[['Postcode','CompanyName','unique_id']]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c7f38424-7d52-4b87-bc84-1b871d700e55",
   "metadata": {},
   "outputs": [],
   "source": [
    "dfs_m = spark.createDataFrame(df_m, schema)\n",
    "dfs_c = spark.createDataFrame(df_c, schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c8b87afb-2b5d-491f-8641-c91366408cbe",
   "metadata": {},
   "outputs": [],
   "source": [
    "import splink.spark.comparison_library as cl\n",
    "\n",
    "settings = {\n",
    "    \"link_type\": \"link_only\",\n",
    "    \"blocking_rules_to_generate_predictions\": [\n",
    "        \"l.Postcode = r.Postcode\",\n",
    "        \"l.CompanyName = r.CompanyName\",\n",
    "    ],\n",
    "    \"comparisons\": [\n",
    "        cl.jaro_winkler_at_thresholds(\"CompanyName\",[0.9,0.8]),\n",
    "    ],\n",
    "    \"retain_intermediate_calculation_columns\" : True,\n",
    "    \"retain_matching_columns\" : True,   \n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fd310707-0ecb-4d03-b4e8-77824ef29ab9",
   "metadata": {},
   "outputs": [],
   "source": [
    "from splink.spark.linker import SparkLinker\n",
    "linker = SparkLinker([dfs_m, dfs_c], settings, input_table_aliases=[\"dfs_m\", \"dfs_c\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5891c0e6-95c8-49de-b407-2618ccf553cd",
   "metadata": {},
   "outputs": [],
   "source": [
    "linker.estimate_u_using_random_sampling(max_pairs=5e7)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f84dc5e7-63c0-4cdb-a779-60571891205d",
   "metadata": {},
   "outputs": [],
   "source": [
    "linker.estimate_parameters_using_expectation_maximisation(\"l.Postcode = r.Postcode\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a8dad02d",
   "metadata": {},
   "outputs": [],
   "source": [
    "#linker.save_model_to_json(\"<your_path>/Chapter8_Splink_Settings.json\", overwrite=True)\n",
    "linker.load_model(\"<your_path>/Chapter8_Splink_Settings.json\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2f0edcd3",
   "metadata": {},
   "outputs": [],
   "source": [
    "linker.match_weights_chart()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a4beb452",
   "metadata": {},
   "outputs": [],
   "source": [
    "linker.m_u_parameters_chart()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "feae788a",
   "metadata": {},
   "source": [
    "# Step 4 - Match Classification"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bc057d6c",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Calculate predictions\n",
    "\n",
    "df_pred = linker.predict(threshold_match_probability=0.1).as_pandas_dataframe()\n",
    "len(df_pred)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c92a3843",
   "metadata": {},
   "outputs": [],
   "source": [
    "len(pd.unique(df_pred['CompanyName_r']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "daa31d66-748d-4408-8a5b-ae1d899eab91",
   "metadata": {},
   "outputs": [],
   "source": [
    "postname = df_pred[(df_pred['CompanyName_l']==df_pred['CompanyName_r']) & (df_pred['Postcode_l']==df_pred['Postcode_r'])]\n",
    "len(postname)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1aca1f86-ba22-45d5-9641-84bca5c5e56c",
   "metadata": {},
   "outputs": [],
   "source": [
    "len(pd.unique(postname['CompanyName_r']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1ab0599e-5a03-4bd7-8c6f-b00dd5aff5be",
   "metadata": {},
   "outputs": [],
   "source": [
    "notname = df_pred[df_pred['CompanyName_l']!=df_pred['CompanyName_r']]\n",
    "len(notname)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "36a4f9d7-7188-4c20-825b-29878607b2cb",
   "metadata": {},
   "outputs": [],
   "source": [
    "len(pd.unique(notname['CompanyName_r']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b5451fb6-731f-4ec6-bfa8-1521522e848a",
   "metadata": {},
   "outputs": [],
   "source": [
    "notpost = df_pred[df_pred['Postcode_l']!=df_pred['Postcode_r']]\n",
    "len(notpost)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c003daf8-d295-4d67-be11-e620af788b92",
   "metadata": {},
   "outputs": [],
   "source": [
    "len(pd.unique(notpost['CompanyName_r']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3dfcc0e1-aca0-4c43-9ef2-31a472eefb8d",
   "metadata": {},
   "outputs": [],
   "source": [
    "results = df_m.merge(df_pred,left_on=['unique_id'], right_on=['unique_id_r'],how='left',\n",
    "          suffixes=('_m', '_p'))\n",
    "results[results['match_weight'].isnull()]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b99ab357-ce94-4b80-bf26-c3175ca48d86",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
